import { ConnectionPool } from '@/core/connection-pool';
import { Producer } from '@/core/producer';
import { Consumer } from '@/core/consumer';
import { Message } from '@/core/message';
import { MessageHandler, QueueConfig, Logger } from '@/types';
import { createLogger } from '@/utils/logger';

/**
 * Work Queue pattern implementation
 * Distributes tasks among multiple workers
 */
export class WorkQueue {
  protected readonly connectionPool: ConnectionPool;
  protected readonly queueName: string;
  protected readonly queueConfig: QueueConfig;
  protected readonly producer: Producer;
  protected readonly logger: Logger;
  protected readonly workers: Consumer[] = [];

  constructor(
    connectionPool: ConnectionPool,
    queueName: string,
    queueConfig: Partial<QueueConfig> = {},
    logger?: Logger
  ) {
    this.connectionPool = connectionPool;
    this.queueName = queueName;
    this.queueConfig = {
      name: queueName,
      durable: true,
      exclusive: false,
      autoDelete: false,
      ...queueConfig,
    };
    this.producer = new Producer(connectionPool);
    this.logger = logger ?? createLogger('WorkQueue');
  }

  /**
   * Setup the queue
   */
  async setupQueue(): Promise<void> {
    let connection;
    try {
      connection = await this.connectionPool.getConnection();
      const channel = connection.channel;

      await channel.assertQueue(this.queueConfig.name, {
        durable: this.queueConfig.durable,
        exclusive: this.queueConfig.exclusive,
        autoDelete: this.queueConfig.autoDelete,
        arguments: this.queueConfig.arguments,
      });

      // 初始化 Producer
      await this.producer.initialize();

      this.logger.info(`Work queue ${this.queueName} setup completed`);
    } catch (error) {
      this.logger.error(`Failed to setup work queue ${this.queueName}`, error as Error);
      throw error;
    } finally {
      if (connection) {
        await this.connectionPool.returnConnection(connection);
      }
    }
  }

  /**
   * Send a task to the queue
   */
  async sendTask(taskData: unknown, priority = 0): Promise<void> {
    const message = new Message(taskData);
    message.addProperty('priority', priority);

    await this.producer.send(message, {
      routingKey: this.queueName,
      persistent: true,
      priority,
    });

    this.logger.debug(`Task sent to queue ${this.queueName}`, { messageId: message.id });
  }

  /**
   * Send multiple tasks in batch
   */
  async sendTasks(tasks: Array<{ data: unknown; priority?: number }>): Promise<void> {
    const messages = tasks.map(task => {
      const message = new Message(task.data);
      message.addProperty('priority', task.priority || 0);
      return message;
    });

    const results = await this.producer.sendBatch(messages, {
      routingKey: this.queueName,
      persistent: true,
    });

    const successCount = results.filter(r => r).length;
    this.logger.info(`Sent ${successCount}/${tasks.length} tasks to queue ${this.queueName}`);
  }

  /**
   * Start workers to process tasks
   */
  async startWorkers(
    handler: MessageHandler,
    workerCount = 1,
    options: {
      prefetchCount?: number;
      autoAck?: boolean;
    } = {}
  ): Promise<void> {
    for (let i = 0; i < workerCount; i++) {
      const worker = new Consumer(this.connectionPool, handler);

      // 初始化 Consumer
      await worker.initialize();

      await worker.startConsuming({
        queueName: this.queueName,
        prefetchCount: options.prefetchCount || 1,
        autoAck: options.autoAck || false,
      });

      this.workers.push(worker);
      this.logger.info(`Started worker ${i + 1}/${workerCount} for queue ${this.queueName}`);
    }
  }

  /**
   * Stop all workers
   */
  async stopWorkers(): Promise<void> {
    // 停止消费并关闭所有 workers
    await Promise.all(this.workers.map(async (worker) => {
      await worker.stopConsuming();
      await worker.close(); // 关闭 consumer 以释放连接
    }));
    this.workers.length = 0;
    this.logger.info(`Stopped all workers for queue ${this.queueName}`);
  }

  /**
   * Get queue statistics
   */
  async getQueueStats(): Promise<{
    messageCount: number;
    consumerCount: number;
  }> {
    let connection;
    try {
      connection = await this.connectionPool.getConnection();
      const channel = connection.channel;

      const queueInfo = await channel.checkQueue(this.queueName);

      return {
        messageCount: queueInfo.messageCount,
        consumerCount: queueInfo.consumerCount,
      };
    } catch (error) {
      this.logger.error(`Failed to get queue stats for ${this.queueName}`, error as Error);
      throw error;
    } finally {
      if (connection) {
        await this.connectionPool.returnConnection(connection);
      }
    }
  }

  /**
   * Purge the queue (remove all messages)
   */
  async purgeQueue(): Promise<number> {
    let connection;
    try {
      connection = await this.connectionPool.getConnection();
      const channel = connection.channel;

      const result = await channel.purgeQueue(this.queueName);
      this.logger.info(`Purged ${result.messageCount} messages from queue ${this.queueName}`);

      return result.messageCount;
    } catch (error) {
      this.logger.error(`Failed to purge queue ${this.queueName}`, error as Error);
      throw error;
    } finally {
      if (connection) {
        await this.connectionPool.returnConnection(connection);
      }
    }
  }

  /**
   * Delete the queue
   */
  async deleteQueue(options: { ifUnused?: boolean; ifEmpty?: boolean } = {}): Promise<number> {
    let connection;
    try {
      connection = await this.connectionPool.getConnection();
      const channel = connection.channel;

      const result = await channel.deleteQueue(this.queueName, options);
      this.logger.info(`Deleted queue ${this.queueName}, removed ${result.messageCount} messages`);

      return result.messageCount;
    } catch (error) {
      this.logger.error(`Failed to delete queue ${this.queueName}`, error as Error);
      throw error;
    } finally {
      if (connection) {
        await this.connectionPool.returnConnection(connection);
      }
    }
  }

  /**
   * Close the work queue
   */
  async close(): Promise<void> {
    await this.stopWorkers();
    await this.producer.close();
    this.logger.info(`Work queue ${this.queueName} closed`);
  }
}

/**
 * Priority Work Queue with priority support
 */
export class PriorityWorkQueue extends WorkQueue {
  private readonly maxPriority: number;

  constructor(
    connectionPool: ConnectionPool,
    queueName: string,
    maxPriority = 10,
    queueConfig: Partial<QueueConfig> = {},
    logger?: Logger
  ) {
    const priorityQueueConfig = {
      ...queueConfig,
      arguments: {
        ...queueConfig.arguments,
        'x-max-priority': maxPriority,
      },
    };

    super(connectionPool, queueName, priorityQueueConfig, logger);
    this.maxPriority = maxPriority;
  }

  /**
   * Send a high priority task
   */
  async sendHighPriorityTask(taskData: unknown): Promise<void> {
    await this.sendTask(taskData, this.maxPriority);
  }

  /**
   * Send a low priority task
   */
  async sendLowPriorityTask(taskData: unknown): Promise<void> {
    await this.sendTask(taskData, 1);
  }
}

/**
 * Delayed Work Queue with message delay support
 */
export class DelayedWorkQueue extends WorkQueue {
  constructor(
    connectionPool: ConnectionPool,
    queueName: string,
    queueConfig: Partial<QueueConfig> = {},
    logger?: Logger
  ) {
    const delayedQueueConfig = {
      ...queueConfig,
      arguments: {
        ...queueConfig.arguments,
        'x-message-ttl': 0, // Will be set per message
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': queueName,
      },
    };

    super(connectionPool, `${queueName}_delayed`, delayedQueueConfig, logger);
  }

  /**
   * Send a delayed task
   */
  async sendDelayedTask(taskData: unknown, delayMs: number): Promise<void> {
    const message = new Message(taskData);
    message.addProperty('expiration', delayMs.toString());

    await this.producer.send(message, {
      routingKey: this.queueName,
      persistent: true,
      expiration: delayMs,
    });

    this.logger.debug(`Delayed task sent to queue ${this.queueName}`, {
      messageId: message.id,
      delayMs,
    });
  }
}

/**
 * Utility functions for creating work queues
 */
export const createWorkQueue = (
  connectionPool: ConnectionPool,
  queueName: string,
  queueConfig?: Partial<QueueConfig>,
  logger?: Logger
): WorkQueue => {
  return new WorkQueue(connectionPool, queueName, queueConfig, logger);
};

export const createPriorityWorkQueue = (
  connectionPool: ConnectionPool,
  queueName: string,
  maxPriority = 10,
  queueConfig?: Partial<QueueConfig>,
  logger?: Logger
): PriorityWorkQueue => {
  return new PriorityWorkQueue(connectionPool, queueName, maxPriority, queueConfig, logger);
};

export const createDelayedWorkQueue = (
  connectionPool: ConnectionPool,
  queueName: string,
  queueConfig?: Partial<QueueConfig>,
  logger?: Logger
): DelayedWorkQueue => {
  return new DelayedWorkQueue(connectionPool, queueName, queueConfig, logger);
};
